package kafka

import (
	"encoding/json"
	"github.com/Shopify/sarama"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/ctx"
	"gitlab-ce.k8s.tools.vchangyi.com/common/go-toolbox/log"
)

type appKafka struct {
	topic    string
	producer sarama.SyncProducer
}

type response struct {
	Patition int32 `json:"patition"`
	Offset   int64 `json:"offset"`
}

// NewAppKafka 沟通方法
func NewAppKafka(topic string, producer sarama.SyncProducer) *appKafka {
	return &appKafka{topic: topic, producer: producer}
}

func (ak appKafka) Producer(data string) error {
	msg := &sarama.ProducerMessage{
		Topic: ak.topic,
		Key:   sarama.StringEncoder("key"),
		Value: sarama.ByteEncoder(data),
	}

	//SendMessage：该方法是生产者生产给定的消息
	//生产成功的时候返回该消息的分区和所在的偏移量
	//生产失败的时候返回error
	_, _, err := ak.producer.SendMessage(msg)
	return err
}

func (ak appKafka) Close() error {
	return ak.producer.Close()
}

//把消息推送到指定的topic
func (ak appKafka) ProducerWithTopic(topic string, data interface{}) (int32, int64, error) {
	dataJson, err := json.Marshal(data)
	if err != nil {
		return 0, 0, err
	}
	log.Info(ctx.New()).Msgf("kafka-producer-with-topic-info:%s,%s", topic, dataJson)
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder("key"),
		Value: sarama.ByteEncoder(string(dataJson)),
	}

	//SendMessage：该方法是生产者生产给定的消息
	//生产成功的时候返回该消息的分区和所在的偏移量
	//生产失败的时候返回error
	patition, offset, err := ak.producer.SendMessage(msg)
	if err != nil {
		log.Error(ctx.New()).Msgf("kafka-producer-with-topic-err:%s,%s,%+v", topic, dataJson, err)
	}
	return patition, offset, err
}
